Sentinel Bulk Logs Export.ipynb (498 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"source": [
"## Overview\n",
"This Notebook takes a KQL query and breaks it into batches that fit within the limits of the Azure Monitor API. This allows us to export more than the default 30,000 record/64MB limits experienced when using the native interface. The export will run the batches in parallel and write the data to local disk in the format specified in the OUTPUT_FORMAT parameter."
],
"metadata": {}
},
{
"cell_type": "markdown",
"source": [
"## 1. Install Dependencies\n",
"Run this cell to install the required Phython libraries."
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"import sys\n",
"!{sys.executable} -m pip install azure-monitor-query azure-identity pandas tqdm"
],
"outputs": [],
"execution_count": null,
"metadata": {
"gather": {
"logged": 1735980653518
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "markdown",
"source": [
"## 2. Set Parameters\n",
"Modify the below parameters as necessary and then run the below cell."
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"from datetime import datetime, timedelta, timezone\n",
"\n",
"#Required parameters:\n",
"START_TIME = datetime(2024, 12, 10, tzinfo=timezone.utc) #Start time of the time range for the query.\n",
"END_TIME = datetime(2024, 12, 15, tzinfo=timezone.utc) #End time of the time range for the query.\n",
"QUERY = \"SecurityEvent | project TimeGenerated, Account\" #KQL query to run\n",
"\n",
"#If needed, change which Log Analytics workspace to use:\n",
"USE_DEFAULT_LAW_ID = True #If present, use the Log Analytics workspace ID that is present in the config.json file which gets created by Sentinel Notebooks.\n",
"LAW_ID = \"xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx\" #Log Analytics workspace ID to use if config.json file is not present or USE_DEFAULT_LAW_ID is set to False.\n",
"\n",
"#Optional parameters used for performance and output tuning:\n",
"THREADS = 2 #Number of jobs to run in parallel. Typically, this should match the number of cores of the VM. Because the Azure Monitor API can only run 5 concurrent queries at a time, there are diminishing returns after a certain point.\n",
"AUTO_BATCH = True #Attempts to automatically detect optimial batch size (time range) to use when breaking up the query.\n",
"BATCH_SIZE = timedelta(hours=6) #If AUTO_BATCH is set to False, this batch size (time range) will be used to break up the query.\n",
"MIN_BATCH_SIZE = timedelta(minutes=1) #If the data returned cannot fit within this time range, we skip and move to the next batch.\n",
"OUTPUT_DIRECTORY = \"./law_export\" #Directory where results will be stored. A new directory gets created for each run.\n",
"OUTPUT_FILE_PREFIX = \"query_results\" #Prefix used for the data files containing the query results.\n",
"OUTPUT_FORMAT = 'CSV' #File format used to the store the query results on disk. CSV or PARQUET values are supported.\n",
"OUTPUT_COMBINE_FILES = True #Combine all job data files into a single file.\n",
"TIMEOUT = 3 #Number of minutes allowed before query times out. 10 minutes is max."
],
"outputs": [],
"execution_count": null,
"metadata": {
"gather": {
"logged": 1736036638278
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "markdown",
"source": [
"## 3. Export Data\n",
"Run the below cell to start the export process. Data will be written to local files in the directory specified in the OUTPUT_DIRECTORY parameter."
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"from datetime import datetime, timedelta, timezone\n",
"import pandas as pd\n",
"import time\n",
"from azure.monitor.query import LogsQueryClient, LogsQueryStatus\n",
"from azure.core.exceptions import HttpResponseError\n",
"from azure.identity import DefaultAzureCredential\n",
"import logging\n",
"import os\n",
"import glob \n",
"import json\n",
"from multiprocessing import Pool, Manager\n",
"from tqdm import tqdm\n",
"\n",
"class time_range_class:\n",
" def __init__(self, name, start_time, end_time):\n",
" self.name = name\n",
" self.start_time = start_time\n",
" self.end_time = end_time\n",
"\n",
"def get_time_ranges(start_time=datetime.now(), end_time=datetime.now() - timedelta(hours=24), number_of_ranges=5):\n",
" ranges = []\n",
" interval = (end_time - start_time) / number_of_ranges\n",
" delta = timedelta(microseconds=0)\n",
"\n",
" index = 0\n",
" for i in range(number_of_ranges):\n",
" range_name = \"Job \" + str(index) \n",
" range_start = end_time - ((i + 1) * interval)\n",
" range_end = (end_time - (i * interval)) - delta\n",
" time_range = time_range_class(range_name, range_start, range_end)\n",
" ranges.append(time_range)\n",
" index += 1\n",
" delta = timedelta(microseconds=1)\n",
"\n",
" return ranges\n",
"\n",
"def read_config_values(file_path):\n",
" try:\n",
" with open(file_path) as json_file:\n",
" if json_file:\n",
" json_config = json.load(json_file)\n",
" return (json_config[\"workspace_id\"])\n",
" except:\n",
" return None\n",
"\n",
"def write_to_file(df, export_path, prefix, format):\n",
" match format:\n",
" case 'PARQUET':\n",
" path = os.path.join(export_path, f\"{prefix}.parquet\")\n",
" df.to_parquet(path)\n",
" case 'CSV':\n",
" path = os.path.join(export_path, f\"{prefix}.csv\")\n",
" df.to_csv(path, index=False) \n",
" \n",
"def get_batch_size(query, law_id, start_time, end_time):\n",
" batch_query = (f\"{query}\"\n",
" \"| summarize NumberOfBatchesBytes = 38400000 / avg(estimate_data_size(*)), NumberOfBatchesRows = count()\"\n",
" \"| project NumberOfBatchesBytes = todecimal(NumberOfBatchesRows / NumberOfBatchesBytes), NumberOfBatchesRows = todecimal(NumberOfBatchesRows) / todecimal(450000)\"\n",
" \"| project NumberOfBatches = round(iff(NumberOfBatchesBytes > NumberOfBatchesRows, NumberOfBatchesBytes, NumberOfBatchesRows), 2)\"\n",
" \"| project NumberOfBatches = iif(NumberOfBatches < toreal(1), toreal(1), NumberOfBatches)\")\n",
"\n",
" response = client.query_workspace(workspace_id=law_id, query=batch_query, timespan=(start_time, end_time))\n",
"\n",
" if response.status == LogsQueryStatus.SUCCESS:\n",
" data = response.tables\n",
" else:\n",
" error = response.partial_error\n",
" data = response.partial_data\n",
" raise Exception(error.details[0][\"innererror\"])\n",
" for table in data:\n",
" df = pd.DataFrame(data=table.rows, columns=table.columns)\n",
" \n",
" return df['NumberOfBatches'].iloc[0]\n",
"\n",
"def export_log_analytics_data(\n",
" law_id: str,\n",
" query: str,\n",
" start_time: datetime = None,\n",
" end_time: datetime = None,\n",
" batch_size: timedelta = timedelta(hours=4),\n",
" job_name: str = None,\n",
" queue = None,\n",
" min_batch_size: timedelta = timedelta(minutes=15),\n",
" client: LogsQueryClient = None,\n",
" export_path = '',\n",
" export_prefix = 'query_results',\n",
" auto_batch = True,\n",
" export_format: str = 'CSV',\n",
" timeout: int = 10,\n",
" delay: int = 0,\n",
" max_retries: int = 5,\n",
" export_to_file: bool = True,\n",
" json_depth: int = 10,\n",
" ):\n",
"\n",
" time_range: timedelta = end_time - start_time\n",
" error_count: int = 0\n",
" initial_batch_size: timedelta = batch_size\n",
" batch_count: timedelta = timedelta()\n",
" current_count: int = 0\n",
" percent_complete: int = 0\n",
" stop_time: datetime = start_time\n",
" time_range_format: str = 'd\\dh\\hm\\ms\\s'\n",
" time_format: str = \"%m-%d-%Y %H-%M-%S\"\n",
" runs_without_error_count: int = 0\n",
" loop_done: bool = False\n",
" rows_returned: int = 0\n",
" results = [] \n",
" \n",
"\n",
" logging.basicConfig(filename=f\"{export_path}/{job_name}.log\",\n",
" filemode='a',\n",
" format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',\n",
" datefmt='%H:%M:%S',\n",
" level=logging.INFO)\n",
" \n",
" logging.FileHandler(f\"{export_path}/{job_name}.log\")\n",
"\n",
" logging.info(f\"{job_name}: Starting new job.\")\n",
"\n",
" if auto_batch == True: \n",
" try:\n",
" batch_size = time_range / get_batch_size(query, law_id, start_time, end_time)\n",
" except Exception as err:\n",
" logging.error(f\"{job_name}: Unhandled Error: {type(err)} {err}\")\n",
" return ({'job_name': job_name, 'status': 'error'})\n",
"\n",
" if batch_size > time_range: batch_size = time_range\n",
"\n",
" while error_count <= max_retries:\n",
" try:\n",
" while loop_done == False:\n",
" \n",
" if batch_size < initial_batch_size and runs_without_error_count > 5:\n",
" batch_size *= 2\n",
" logging.info(f\"{job_name}: Increasing batch size to {batch_size}.\")\n",
" \n",
" start_time = end_time - batch_size\n",
"\n",
" if start_time <= stop_time:\n",
" start_time = stop_time\n",
" batch_size = end_time - start_time\n",
" loop_done = True\n",
"\n",
" logging.info(f\"{job_name}: Running query between {start_time.strftime(time_format)} and {end_time.strftime(time_format)}.\")\n",
"\n",
" response = client.query_workspace(workspace_id=law_id, query=query, timespan=(start_time, end_time), timeout=timeout)\n",
"\n",
" if response.status == LogsQueryStatus.SUCCESS:\n",
" data = response.tables\n",
" else:\n",
" error = response.partial_error\n",
" data = response.partial_data\n",
" raise Exception(error.details[0][\"innererror\"])\n",
" for table in data:\n",
" df = pd.DataFrame(data=table.rows, columns=table.columns)\n",
" write_to_file(df, export_path, (f\"{export_prefix}_{start_time.strftime(time_format)}\"), export_format)\n",
" \n",
" batch_count += batch_size\n",
" percent_complete_previous = percent_complete\n",
" percent_complete = round((batch_count / time_range) * 100)\n",
" logging.info(f\"{job_name}: Received {df.shape[0]} rows of data and written to disk. Percent Complete: {percent_complete}\")\n",
" queue.put({'job_name': job_name, 'progress_update': (percent_complete - percent_complete_previous), 'rows_returned': int(df.shape[0])})\n",
" rows_returned += int(df.shape[0])\n",
"\n",
" runs_without_error_count += 1\n",
" end_time = start_time + timedelta(microseconds=-1)\n",
" time.sleep(delay)\n",
"\n",
" logging.info(f\"{job_name}: Finished exporting {rows_returned} records from Log Analytics. Percent Complete: 100\")\n",
" queue.put({'job_name': job_name, 'progress_update': (100 - percent_complete), 'rows_returned': 0})\n",
" logging.Handler.close\n",
"\n",
" return ({'job_name': job_name, 'status': 'success', 'rows_returned_total': rows_returned})\n",
" except Exception as err:\n",
" if \"Response ended prematurely\" in str(err):\n",
" logging.warning(f\"{job_name}: Response ended prematurely, retrying. Message {type(err)} {err}\") \n",
" elif (\"Maximum response size of 100000000 bytes exceeded\" in str(err) \n",
" or 'The results of this query exceed the set limit of 64000000 bytes' in str(err) \n",
" or 'The results of this query exceed the set limit of 500000 records' in str(err)):\n",
" runs_without_error_count = 0\n",
" if batch_size == min_batch_size:\n",
" logging.error(f\"{job_name}: Results cannot be returned in the specified minimum batch size. Skipping batch between {start_time.strftime(time_format)} and {end_time.strftime(time_format)}. Message: {type(err)} {err}\")\n",
" batch_count += batch_size\n",
" end_time = start_time + timedelta(microseconds=-1)\n",
" loop_done = False\n",
" else:\n",
" batch_size = batch_size / 2\n",
" if batch_size < min_batch_size:\n",
" batch_size = min_batch_size\n",
" logging.info(f\"{job_name}: Reduced batch size to: {batch_size}. Message: {type(err)} {err}\")\n",
" loop_done = False\n",
" else:\n",
" logging.error(f\"Unhandled Error: {type(err)} {err}\")\n",
" error_count += 1\n",
" if error_count > max_retries:\n",
" logging.error(f\"{job_name}: Max number of retries reached, exiting.\")\n",
" return ({'job_name': job_name, 'status': 'error'})\n",
" finally:\n",
" logging.Handler.close\n",
" \n",
"\n",
"time_format: str = \"%m-%d-%Y %H-%M-%S\"\n",
"if not os.path.exists(OUTPUT_DIRECTORY): os.makedirs(OUTPUT_DIRECTORY)\n",
"job_directory = f\"{OUTPUT_DIRECTORY}/{datetime.now().strftime(time_format)}\"\n",
"os.mkdir(job_directory)\n",
"\n",
"ranges = get_time_ranges(start_time=START_TIME, end_time=END_TIME, number_of_ranges=THREADS )\n",
"\n",
"completed_jobs = []\n",
"failed_jobs = []\n",
"last_queue_time = datetime.now()\n",
"\n",
"def log_result(result):\n",
" global completed_jobs\n",
" if result['status'] == 'success':\n",
" completed_jobs.append(result)\n",
" else:\n",
" print(f\"{result['job_name']} has failed. Please check log file for details.\")\n",
" failed_jobs.append(result)\n",
"\n",
"def log_error(error):\n",
" print(error)\n",
"\n",
"def cleanup():\n",
" pbar.clear()\n",
" pbar.close()\n",
" pool.close()\n",
" pool.join()\n",
"\n",
"workspace_id = read_config_values('config.json')\n",
"\n",
"if workspace_id == None or USE_DEFAULT_LAW_ID == False:\n",
" if LAW_ID != \"xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx\":\n",
" law_id = LAW_ID\n",
" else:\n",
" raise Exception(\"Please specify a valid Log Analyics workspace ID in the Parameters cell.\")\n",
"else:\n",
" law_id = workspace_id\n",
"\n",
"manager = Manager()\n",
"queue = manager.Queue()\n",
"\n",
"credential = DefaultAzureCredential()\n",
"client = LogsQueryClient(credential)\n",
"\n",
"pool = Pool()\n",
"\n",
"pbar = tqdm(total=THREADS * 100, leave=True, position=0)\n",
"pbar.set_description(f\"Splitting query into {THREADS} jobs for parallel processing\", refresh=True)\n",
"\n",
"for i in ranges:\n",
" result = pool.apply_async(export_log_analytics_data, [law_id, QUERY, i.start_time, i.end_time, BATCH_SIZE, i.name, queue, MIN_BATCH_SIZE, client, job_directory, OUTPUT_FILE_PREFIX, AUTO_BATCH, OUTPUT_FORMAT, TIMEOUT], callback=log_result, error_callback=log_error) \n",
"\n",
"while (len(completed_jobs) + len(failed_jobs)) < THREADS or not queue.empty():\n",
" if not queue.empty():\n",
" item = queue.get()\n",
" last_queue_time = datetime.now()\n",
" pbar.update(item['progress_update'])\n",
" if datetime.now() - last_queue_time > timedelta(minutes=TIMEOUT):\n",
" pbar.set_description(f\"No input received from running job(s) for more than {TIMEOUT} minutes, check logs for errors. Exiting.\", refresh=True)\n",
" if len(completed_jobs) > 0: print(f\"Completed jobs: {', '.join([item['job_name'] for item in completed_jobs])}.\")\n",
" cleanup()\n",
" break\n",
" time.sleep(1) \n",
"\n",
"def concat(csv_file):\n",
" return pd.read_csv(csv_file, low_memory=False) \n",
"\n",
"if len(completed_jobs) > 0:\n",
" pbar.set_description(f\"Export of {sum([item['rows_returned_total'] for item in completed_jobs])} records to {job_directory} complete\", refresh=True)\n",
" if OUTPUT_COMBINE_FILES == True:\n",
" csv_files = glob.glob(job_directory + '/*.{}'.format('csv'))\n",
" df = pd.concat(map(concat, csv_files), ignore_index=True)\n",
" df.to_csv(job_directory + '/' + OUTPUT_FILE_PREFIX + '_FullExport.csv', index=False)\n",
"else:\n",
" pbar.set_description(f\"No jobs completed successfully. Please check log files in {job_directory} for details.\")\n",
"\n",
"cleanup()"
],
"outputs": [],
"execution_count": null,
"metadata": {
"gather": {
"logged": 1736036665532
},
"jupyter": {
"outputs_hidden": false,
"source_hidden": true
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "markdown",
"source": [
"## 4. Cleanup\n",
"Run the below cell to delete all run data including logs and data files."
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"import shutil\n",
"\n",
"try:\n",
" shutil.rmtree(OUTPUT_DIRECTORY)\n",
" print(\"Data has been deleted.\")\n",
"except Exception as err:\n",
" print(f\"Error deleting data: {err}\")\n"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": true,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"gather": {
"logged": 1736035615644
}
}
}
],
"metadata": {
"kernel_info": {
"name": "python38-azureml"
},
"kernelspec": {
"name": "python38-azureml",
"language": "python",
"display_name": "Python 3.8 - AzureML"
},
"language_info": {
"name": "python",
"version": "3.10.11",
"mimetype": "text/x-python",
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"pygments_lexer": "ipython3",
"nbconvert_exporter": "python",
"file_extension": ".py"
},
"microsoft": {
"host": {
"AzureML": {
"notebookHasBeenCompleted": true
}
},
"ms_spell_check": {
"ms_spell_check_language": "en"
}
},
"nteract": {
"version": "nteract-front-end@1.0.0"
}
},
"nbformat": 4,
"nbformat_minor": 2
}